package com.uxsino.commons.db.queue;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

/**
 * @ClassName DBQueue,数据库队列
 * @Description TODO
 * @Author <a href="mailto:royrxc@gmail.com">Ran</a>
 * @Daate 2019/12/14 10:41
 **/
@Service
public class DBQueue {
    private static final Logger LOG = LoggerFactory.getLogger(DBQueue.class);

    public enum CacheState{
        /**
         * 自动转换
         */
        AUTO,
        /**
         * 内存队列
         */
        MEM,
        /**
         * 数据库队列
         */
        DB;
    }

    @Autowired
    DBQueueService dbQueueService;

    private static final ReentrantLock LOCKER_POLL = new ReentrantLock();
    private static final ReentrantLock LOCKER_PUSH = new ReentrantLock();

    private static final ConcurrentHashMap<String, LinkedBlockingQueue<?>> MEM_QUEUE = new ConcurrentHashMap<>();
    private static final ConcurrentHashMap<String, CacheState> MEM_STATE = new ConcurrentHashMap<>();

    private LinkedBlockingQueue<?> create(String name, int capcity){
        synchronized (MEM_QUEUE){
            LinkedBlockingQueue<?> q = MEM_QUEUE.getOrDefault(name, new LinkedBlockingQueue<>(capcity));
            MEM_QUEUE.put(name,q);
            return q;
        }
    }

    private void remove(String name){
        synchronized (MEM_QUEUE){
            LinkedBlockingQueue<?> q = MEM_QUEUE.remove(name);
            if(q != null){
                q.clear();
            }
        }
    }

    public Map<String, Integer> queueSize(){
        Map<String, Integer> result = Maps.newHashMap();
        MEM_QUEUE.forEach((k, q)->{
            result.put(k, q.size());
        });
        return result;
    }

    /**
     * 空队列通知锁
     */
    ConcurrentHashMap<String, ReentrantLock> LOCKS = new ConcurrentHashMap<>();


    public void wait(String name, int timeout) throws InterruptedException{
        synchronized (getLocker(name)){
            getLocker(name).wait(timeout);
        }
    }

    public void notify(String name){
        synchronized (getLocker(name)){
            getLocker(name).notifyAll();
        }
    }

    public ReentrantLock getLocker(String name){
        synchronized (LOCKS){
            ReentrantLock locker = LOCKS.getOrDefault(name, new ReentrantLock());
            LOCKS.put(name, locker);
            return locker;
        }
    }

    private void notifyAllLocker(){
        LOCKS.values().parallelStream().forEach(locker->{
            synchronized (locker){
                try {
                    locker.notifyAll();
                }catch (Exception e){}
            }
        });
    }

    @PostConstruct
    private void init(){
        dbQueueService.check();
    }

    @Scheduled(cron = "0 0 0 1,10,20 * ?")
    public void check(){
        dbQueueService.check();
    }
    public synchronized <T> List<T> poll(String name, int size, TypeReference<T> reference, int emptySleepMs) throws InterruptedException{
        List<T> data = poll(name, size).parallelStream().map(itm->JSON.parseObject(itm, reference)).collect(Collectors.toList());
        if((data == null || data.isEmpty()) && emptySleepMs > 0){
            wait(name, emptySleepMs);
        }
        return data;
    }

    public synchronized <T> List<T> poll(String name, int size, TypeReference<T> reference){
        LOCKER_POLL.lock();
        try {
            return poll(name, size).parallelStream().map(itm->JSON.parseObject(itm, reference)).collect(Collectors.toList());
        }finally {
            LOCKER_POLL.unlock();
        }
    }
    public synchronized List<String> poll(String name, int size, int emptySleepMs) throws InterruptedException {
        List<String> data = poll(name, size);
        if((data == null || data.isEmpty()) && emptySleepMs > 0){
            wait(name, emptySleepMs);
        }
        return data;
    }

    public synchronized List<String> poll(String name, int size){
        LOCKER_POLL.lock();
        try {
            return dbQueueService.poll(name, size);
        } finally {
            LOCKER_POLL.unlock();
        }
    }

    public void push(Collection<Object> data, String name){
        LOCKER_PUSH.lock();
        try {
            if(data == null || data.isEmpty()){
                return;
            }
            push(name, data.stream().map(JSON::toJSONString).collect(Collectors.toList()));
        }finally {
            LOCKER_PUSH.unlock();
        }
    }

    public void push(String name, Collection<String> data){
        LOCKER_PUSH.lock();
        try {
            if(data == null || data.isEmpty()){
                return;
            }
            dbQueueService.push(name, data);
            notify(name);
        }finally {
            LOCKER_PUSH.unlock();
        }
    }

    public void clean(String name){
        LOCKER_POLL.lock();
        LOCKER_PUSH.lock();
        try {
            if(Strings.isNullOrEmpty(name)){
                return;
            }
            dbQueueService.clean(name);
            notifyAllLocker();
        }finally {
            try {
                LOCKER_POLL.unlock();
            }catch (Exception e){}
            try{
                LOCKER_PUSH.unlock();
            }catch (Exception e){}
        }
    }

    public Long size(String queueName){
        return dbQueueService.size(queueName);
    }

    public Long size(){
        return dbQueueService.size();
    }














}
